package com.atguigu.day07;

import com.atguigu.bean.WaterSensor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

import java.util.ArrayList;
import java.util.Comparator;

public class Flink06_KeyedState_ListState {
    public static void main(String[] args) throws Exception {
        //1.获取流的执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //2.从端口读取数据
        KeyedStream<WaterSensor, Tuple> keyedStream = env.socketTextStream("localhost", 9999)
                .map(new MapFunction<String, WaterSensor>() {
                    @Override
                    public WaterSensor map(String value) throws Exception {
                        String[] split = value.split(",");
                        return new WaterSensor(split[0], Long.parseLong(split[1]), Integer.parseInt(split[2]));
                    }
                })
                .keyBy("id");

        //TODO 3.针对每个传感器输出最高的3个水位值
        SingleOutputStreamOperator<String> result = keyedStream.process(new KeyedProcessFunction<Tuple, WaterSensor, String>() {

            //TODO 定义状态
            private ListState<Integer> listState;


            //TODO 初始化状态
            @Override
            public void open(Configuration parameters) throws Exception {
                listState = getRuntimeContext().getListState(new ListStateDescriptor<Integer>("list-State", Integer.class));
            }

            @Override
            public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
                //1.将当前数据存入状态
                listState.add(value.getVc());

                //2.再取出状态中的数据
                Iterable<Integer> vcS = listState.get();

                //3.将数据再放到List集合中排序
                ArrayList<Integer> vcList = new ArrayList<>();
                for (Integer vc : vcS) {
                    vcList.add(vc);
                }

                //4.判断集合中的数据是否大于三个，大于三个则由大到小排序
                if (vcList.size()>3){
                    vcList.sort(new Comparator<Integer>() {
                        @Override
                        public int compare(Integer o1, Integer o2) {
                            return o2-o1;
                        }
                    });

                    //将下标为3的元素也就是最小的元素删除掉
                    vcList.remove(3);
                }

                //5.将排序后的集合中的数据写入状态
                listState.update(vcList);

                out.collect(vcList.toString());
            }
        });

        result.print();

        env.execute();
    }
}
